{ "cells": [ { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "# Anonlink Entity Service API\n", "\n", "This tutorial demonstrates directly interacting with the entity service via the REST API. The primary alternative is to use\n", "a library or command line tool such as [`anonlink-client`](https://anonlink-client.readthedocs.io/) which can handle the communication with the anonlink entity service.\n", "\n", "### Dependencies\n", "\n", "In this tutorial we interact with the REST API using the `requests` Python library. Additionally we use the `clkhash` Python library to define the linkage schema and to encode the PII. The synthetic dataset comes from the `recordlinkage` package. All the dependencies can be installed with pip:\n", "\n", "```\n", "pip install requests clkhash recordlinkage\n", "```\n", "\n", "\n", "### Steps\n", "\n", "* Check connection to Anonlink Entity Service\n", "* Synthetic Data generation and encoding\n", "* Create a new linkage project\n", "* Upload the encodings\n", "* Create a run\n", "* Retrieve and analyse results" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [], "source": [ "import json\n", "import os\n", "import time\n", "import requests\n", "\n", "from IPython.display import clear_output" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Check Connection\n", "\n", "If you are connecting to a custom entity service, change the address here." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Testing anonlink-entity-service hosted at https://anonlink.easd.data61.xyz/api/v1/\n" ] } ], "source": [ "server = os.getenv(\"SERVER\", \"https://anonlink.easd.data61.xyz\")\n", "url = server + \"/api/v1/\"\n", "print(f'Testing anonlink-entity-service hosted at {url}')" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "{'project_count': 2, 'rate': 9129125, 'status': 'ok'}" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "requests.get(url + 'status').json()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Data preparation\n", "\n", "This section won't be explained in great detail as it directly follows the \n", "[clkhash tutorials](http://clkhash.readthedocs.io/en/latest/).\n", "\n", "We encode a synthetic dataset from the `recordlinkage` library using `clkhash`." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "from tempfile import NamedTemporaryFile\n", "from recordlinkage.datasets import load_febrl4" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "dfA, dfB = load_febrl4()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [], "source": [ "with open('a.csv', 'w') as a_csv:\n", " dfA.to_csv(a_csv, line_terminator='\\n')\n", "\n", "with open('b.csv', 'w') as b_csv: \n", " dfB.to_csv(b_csv, line_terminator='\\n')" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Schema Preparation\n", "\n", "The linkage schema must be agreed on by the two parties. A hashing schema instructs `clkhash` how to treat each column for encoding PII into CLKs. A detailed description of the hashing schema can be found in the [clkhash documentation](https://clkhash.readthedocs.io/en/latest/schema.html).\n", "\n", "A linkage schema can either be defined as Python code as shown here, or as a JSON file (shown in other tutorials). The importance of each field is controlled by the `k` parameter in the `FieldHashingProperties`.\n", "We ignore the record id and social security id fields so they won't be incorporated into the encoding." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [], "source": [ "import clkhash\n", "from clkhash.comparators import *\n", "from clkhash.field_formats import *\n", "schema = clkhash.randomnames.NameList.SCHEMA\n", "_missing = MissingValueSpec(sentinel='')\n", "schema.fields = [\n", " Ignore('rec_id'),\n", " StringSpec('given_name',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " StringSpec('surname',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " IntegerSpec('street_number',\n", " FieldHashingProperties(\n", " NgramComparison(1, positional=True),\n", " BitsPerTokenStrategy(15),\n", " missing_value=_missing)),\n", " StringSpec('address_1',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " StringSpec('address_2',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " StringSpec('suburb',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " IntegerSpec('postcode',\n", " FieldHashingProperties(\n", " NgramComparison(1, positional=True),\n", " BitsPerTokenStrategy(15))),\n", " StringSpec('state',\n", " FieldHashingProperties(\n", " NgramComparison(2),\n", " BitsPerTokenStrategy(15))),\n", " IntegerSpec('date_of_birth',\n", " FieldHashingProperties(\n", " NgramComparison(1, positional=True),\n", " BitsPerTokenStrategy(15),\n", " missing_value=_missing)),\n", " Ignore('soc_sec_id')\n", "]" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Encoding\n", "\n", "Transforming the *raw* personally identity information into CLK encodings following the defined schema. See the [clkhash](https://clkhash.readthedocs.io/) documentation for further details on this." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 2.84kclk/s, mean=643, std=45.7]\n", "generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 3.16kclk/s, mean=631, std=52.9]\n" ] } ], "source": [ "from clkhash import clk\n", "with open('a.csv') as a_pii:\n", " hashed_data_a = clk.generate_clk_from_csv(a_pii, 'secret', schema, validate=False)\n", "with open('clks_a.json', 'w') as f:\n", " json.dump({'clks': hashed_data_a}, f)\n", "\n", "with open('b.csv') as b_pii:\n", " hashed_data_b = clk.generate_clk_from_csv(b_pii, 'secret', schema, validate=False)\n", "with open('clks_b.json', 'w') as f:\n", " json.dump({'clks': hashed_data_b}, f)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Create Linkage Project\n", "\n", "The analyst carrying out the linkage starts by creating a linkage project of the desired output type with the Entity Service.\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "{'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',\n", " 'result_token': '7878f100f21bfb041f2b7411ac8eba7246fd43f08586c8c7',\n", " 'update_tokens': ['81984628f5e0178cccfae34207581836f8cc1e39f92f9532',\n", " '3b13b1528d084baa51c2d25f715b3701cccb3f292c561eb9']}" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "project_spec = {\n", " \"schema\": {},\n", " \"result_type\": \"groups\",\n", " \"number_parties\": 2,\n", " \"name\": \"API Tutorial Test\"\n", "}\n", "credentials = requests.post(url + 'projects', json=project_spec).json()\n", "\n", "project_id = credentials['project_id']\n", "a_token, b_token = credentials['update_tokens']\n", "credentials" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "The server returns a `project_id`, a `result_token` and a set of `update_tokens`, one for each data provider.\n", "- The `project_id` references the project uniquely on the server.\n", "- The `result_token` authorises project API requests, i.e., access to the result of the linkage.\n", "- The `update_tokens` authorise the data upload. There is one `update_token` for each data provider, and each token can only be used once.\n", "\n", "**Note:** the analyst will need to pass on the `project_id` (the\n", "id of the linkage project) and one of the `update_tokens` to\n", "each data provider.\n", "\n", "The `result_token` can also be used to carry out project API requests:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "pycharm": { "is_executing": false, "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "{'error': False,\n", " 'name': 'API Tutorial Test',\n", " 'notes': '',\n", " 'number_parties': 2,\n", " 'parties_contributed': 0,\n", " 'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',\n", " 'result_type': 'groups',\n", " 'schema': {},\n", " 'uses_blocking': False}" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "requests.get(url + 'projects/{}'.format(project_id), \n", " headers={\"Authorization\": credentials['result_token']}).json()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "Now the two clients can upload their data providing the appropriate *upload tokens*." ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## CLK Upload\n", "there are currently two different ways of uploading CLKs to the entity server.\n", "\n", "### Method 1: Direct Upload\n", "The 'clks' endpoint accepts CLKs in both json and binary format. However, this method is not recommended for large datasets, as uploads can not be resumed and might time out." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "a_response = requests.post(\n", " '{}projects/{}/clks'.format(url, project_id),\n", " json={'clks': hashed_data_a},\n", " headers={\"Authorization\": a_token}\n", ").json()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "b_response = requests.post(\n", " '{}projects/{}/clks'.format(url, project_id),\n", " json={'clks': hashed_data_b},\n", " headers={\"Authorization\": b_token}\n", ").json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Method 2. Upload to object store. \n", "The entity service can be deployed with an object store. This object store can be used by the data providers to upload their CLKs.\n", "First, the data provider have to request a set of temporary credentials which authorise the upload to the object store. The returned Temporary Object Store Credentials can be used with any S3 compatible client. For example by using boto3 in Python. The returned credentials are restricted to allow only uploading data to a particular path in a particular bucket for a finite period (defaulting to 12 hours).\n", "After the client uploaded the data, he informs the entity service.\n", "\n", "Note this feature may be disabled by the administrator, in this case the endpoint will return a 500 server error." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Upload party A: OK\n", "Upload party B: OK\n" ] } ], "source": [ "from minio import Minio\n", "\n", "upload_response = requests.get(\n", " url + 'projects/{}/authorize-external-upload'.format(project_id),\n", " headers={'Authorization': a_token},\n", ").json()\n", "\n", "upload_credentials = upload_response['credentials']\n", "upload_info = upload_response['upload']\n", "\n", "# Use Minio python client to upload data\n", "mc = Minio(\n", " upload_info['endpoint'],\n", " access_key=upload_credentials['AccessKeyId'],\n", " secret_key=upload_credentials['SecretAccessKey'],\n", " session_token=upload_credentials['SessionToken'],\n", " region='us-east-1',\n", " secure=upload_info['secure']\n", " )\n", "\n", "etag = mc.fput_object(\n", " upload_info['bucket'],\n", " upload_info['path'] + \"/clks_a.json\",\n", " 'clks_a.json',\n", " metadata={\n", " \"hash-count\": 5000,\n", " \"hash-size\": 128\n", " })\n", "\n", "# Should be able to notify the service that we've uploaded data\n", "res = requests.post(url + f\"projects/{project_id}/clks\",\n", " headers={'Authorization': a_token},\n", " json={\n", " 'encodings': {\n", " 'file': {\n", " 'bucket': upload_info['bucket'],\n", " 'path': upload_info['path'] + \"/clks_a.json\",\n", " }\n", " }\n", " })\n", "print(f'Upload party A: {\"OK\" if res.status_code == 201 else \"ERROR\"}')\n", "#party B:\n", "upload_response = requests.get(\n", " url + 'projects/{}/authorize-external-upload'.format(project_id),\n", " headers={'Authorization': b_token},\n", ").json()\n", "\n", "upload_credentials = upload_response['credentials']\n", "upload_info = upload_response['upload']\n", "\n", "# Use Minio python client to upload data\n", "mc = Minio(\n", " upload_info['endpoint'],\n", " access_key=upload_credentials['AccessKeyId'],\n", " secret_key=upload_credentials['SecretAccessKey'],\n", " session_token=upload_credentials['SessionToken'],\n", " region='us-east-1',\n", " secure=upload_info['secure']\n", " )\n", "\n", "etag = mc.fput_object(\n", " upload_info['bucket'],\n", " upload_info['path'] + \"/clks_b.json\",\n", " 'clks_b.json',\n", " metadata={\n", " \"hash-count\": 5000,\n", " \"hash-size\": 128\n", " })\n", "\n", "# Should be able to notify the service that we've uploaded data\n", "res = requests.post(url + f\"projects/{project_id}/clks\",\n", " headers={'Authorization': b_token},\n", " json={\n", " 'encodings': {\n", " 'file': {\n", " 'bucket': upload_info['bucket'],\n", " 'path': upload_info['path'] + \"/clks_b.json\",\n", " }\n", " }\n", " })\n", "print(f'Upload party B: {\"OK\" if res.status_code == 201 else \"ERROR\"}')" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "is_executing": false } }, "source": [ "Every upload gets a receipt token. In some operating modes this receipt is required to access the results." ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "## Create a run\n", "\n", "Now the project has been created and the CLK encodings have been uploaded we can carry out some privacy preserving record linkage. The same encoding data can be linked using different threshold values by creating **runs**." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "run_response = requests.post(\n", " \"{}projects/{}/runs\".format(url, project_id),\n", " headers={\"Authorization\": credentials['result_token']},\n", " json={\n", " 'threshold': 0.80,\n", " 'name': \"Tutorial Run #1\"\n", " }\n", ").json()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "run_id = run_response['run_id']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Status" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "data": { "text/plain": [ "{'current_stage': {'description': 'waiting for CLKs',\n", " 'number': 1,\n", " 'progress': {'absolute': 2,\n", " 'description': 'number of parties already contributed',\n", " 'relative': 1.0}},\n", " 'stages': 3,\n", " 'state': 'created',\n", " 'time_added': '2020-06-15T05:41:44.177808',\n", " 'time_started': None}" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "requests.get(\n", " '{}projects/{}/runs/{}/status'.format(url, project_id, run_id),\n", " headers={\"Authorization\": credentials['result_token']}\n", " ).json()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "## Results\n", "\n", "Now after some delay (depending on the size) we can fetch the results. This can of course be done by directly polling the REST API using `requests`, however for simplicity we will just use the watch_run_status function provided in `anonlinkclient.rest_client`.\n", "\n", "> Note the `server` is provided rather than `url`." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "State: completed\n", "Stage (3/3): compute output\n" ] } ], "source": [ "from anonlinkclient.rest_client import RestClient, format_run_status\n", "rest_client = RestClient(server)\n", "\n", "for update in rest_client.watch_run_status(project_id, run_id, credentials['result_token'], timeout=300):\n", " clear_output(wait=True)\n", " print(format_run_status(update))" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "data = json.loads(rest_client.run_get_result_text(\n", " project_id, \n", " run_id, \n", " credentials['result_token']))" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "This result is the 1-1 mapping between rows that were more similar than the given threshold." ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "a[174] maps to b[4485]\n", "a[2570] maps to b[3737]\n", "a[1920] maps to b[4157]\n", "a[420] maps to b[4323]\n", "a[136] maps to b[1416]\n", "a[3090] maps to b[4797]\n", "a[2940] maps to b[3663]\n", "a[2228] maps to b[1095]\n", "a[2623] maps to b[3447]\n", "a[672] maps to b[2795]\n", "...\n" ] } ], "source": [ "for i in range(10):\n", " ((_, a_index), (_, b_index)) = sorted(data['groups'][i])\n", " print(\"a[{}] maps to b[{}]\".format(a_index, b_index))\n", "print(\"...\")" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "In this dataset there are 5000 records in common. With the chosen threshold and schema we currently retrieve:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "data": { "text/plain": [ "4842" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(data['groups'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "If you want you can delete the run and project from the anonlink-entity-service." ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "requests.delete(\"{}/projects/{}\".format(url, project_id), headers={\"Authorization\": credentials['result_token']})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }